package com.process;

import canal.util.ip.IPSeeker;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.base.MQBaseETL;
import com.bean.*;
import com.utils.DateStyle;
import com.utils.DateUtil;
import com.utils.KafkaUtil;
import com.utils.RedisUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import redis.clients.jedis.Jedis;

import java.util.Date;

/**
 * @Description: 购物车实时ETL处理
 * @Author: Sky
 * @Times : 2021/8/11 20:19
 * 数据格式如下：{"addTime":1576479746005,"count":1,"goodsId":"100106","guid":"f1eeb1d9-9ee9-4eec-88da-61f87ab0302c","ip":"123.125.71.102","userId":"100208"}
 */
public class CartETL {

    public static void main(String[] args) throws Exception {

        CartETL cartETL = new CartETL();

        cartETL.beforeCartDs();

    }

    //整合购物车数据转换前的结果
    public void beforeCartDs() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        MQBaseETL baseEtl = new MQBaseETL();
        DataStream<String> cartDs = baseEtl.KafkaConsummer("ods_itcast_cart", env);
        //购物车数据转换前
//        cartDs.print();
        //Flink实时ETL
        DataStream<CartEntity> cartBeanDS = cartDs.map(new MapFunction<String, CartEntity>() {
            @Override
            public CartEntity map(String s) throws Exception {
                return CartEntity.getJsonString(s);
            }
        });

        //购物车数据转换后
//        cartDs.print("pri");
//        cartBeanDS.print("b");
        //将购物车数据拉宽.
        DataStream<CartWideEntity> cartWideBeanDS = cartBeanDS.map(new RichMapFunction<CartEntity, CartWideEntity>() {
            Jedis jedis = null;
            IPSeeker ipSeeker = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                jedis = RedisUtil.getJedis().getResource();
                jedis.select(1);
//                ipSeeker = new IPSeeker(getRuntimeContext().getDistributedCache().getFile("qqwry.dat"));
                ipSeeker = new IPSeeker("qqwry.dat","C:\\IP");
            }

            @Override
            public CartWideEntity map(CartEntity cartEntity) throws Exception {

                CartWideEntity cartWideBean = CartWideEntity.getCartWideEntity(cartEntity);
//                System.out.println("###"+cartWideBean);
                //拉宽商品
                String goodsJSON = jedis.hget("foo_shop:dim_goods", cartWideBean.getGoodsId());
//                System.out.println("$$$"+goodsJSON);

                DimGoodsDBEntity  dimGoods= DimGoodsDBEntity.getGoodInfo(goodsJSON);
                //获取商品三级分类数据
                String goodsCat3JSON = jedis.hget("foo_shop:dim_goods_cats", String.valueOf(dimGoods.getGoodsCatId()));
                DimGoodsCatDBEntity dimGoodCat3 = DimGoodsCatDBEntity.getDimGoodCat(goodsCat3JSON);
                ///获取商品二级分类数据
                String goodsCat2JSON = jedis.hget("foo_shop:dim_goods_cats", dimGoodCat3.getParentId());
                DimGoodsCatDBEntity dimGoodCat2 = DimGoodsCatDBEntity.getDimGoodCat(goodsCat2JSON);
                // 获取商品一级分类数据
                String goodsCat1JSON = jedis.hget("foo_shop:dim_goods_cats", dimGoodCat2.getParentId());
                DimGoodsCatDBEntity dimGoodCat = DimGoodsCatDBEntity.getDimGoodCat(goodsCat1JSON);
                // 获取商品店铺数据
                String shopJSON = jedis.hget("foo_shop:dim_shops", String.valueOf(dimGoods.getShopId()));
                DimShopsDBEntity dimShops = DimShopsDBEntity.getDimShops(shopJSON);
                // 获取店铺管理所属城市数据
                String orgCityJSON = jedis.hget("foo_shop:dim_org", String.valueOf(dimShops.getAreaId()));
                DimOrgDBEntity dimOrgCity = DimOrgDBEntity.getDimOrg(orgCityJSON);
                // 获取店铺管理所属省份数据
                String orgProvinceJSON = jedis.hget("foo_shop:dim_org", String.valueOf(dimOrgCity.getParentId()));
                DimOrgDBEntity dimOrg = DimOrgDBEntity.getDimOrg(orgProvinceJSON);
                //设置商品数据
                CartWideEntity cartWideBeans = new CartWideEntity();
                cartWideBeans.setIp(cartWideBean.getIp());
                cartWideBeans.setGoodsPrice(dimGoods.getShopPrice());
                cartWideBeans.setGoodsName(dimGoods.getGoodsName());
                cartWideBeans.setGoodsCat3(dimGoodCat3.getCatName());
                cartWideBeans.setGoodsCat2(dimGoodCat2.getCatName());
                cartWideBeans.setGoodsCat1(dimGoodCat.getCatName());
                cartWideBeans.setShopId(String.valueOf(dimShops.getShopId()));
                cartWideBeans.setShopName(dimShops.getShopName());
                cartWideBeans.setShopProvinceId(String.valueOf(dimOrgCity.getOrgId()));
                cartWideBeans.setShopProvinceName(dimOrgCity.getOrgName());
                cartWideBeans.setShopCityId(String.valueOf(dimOrgCity.getOrgId()));
                cartWideBeans.setShopCityName(dimOrgCity.getOrgName());

                //解析IP数据
                String country = ipSeeker.getCountry(cartWideBeans.getIp());
                String[] areaArray = country.split("省");
                if (areaArray.length > 1) {
                    cartWideBeans.setClientProvince(areaArray[0] + "省");
                    cartWideBeans.setClientCity(areaArray[1]);
                } else if (areaArray.length == 1) {
                    cartWideBeans.setClientProvince(areaArray[0]);
                    cartWideBeans.setClientCity("");
                }
//                System.out.println("%%%"+cartWideBeans);
                cartWideBeans.setAddTime(DateUtil.StringToString(cartWideBean.getAddTime(), DateStyle.YYYY_MM_DD_HH_MM));
//                System.out.println("%%%"+cartWideBeans);
                return cartWideBeans;
            }

            @Override
            public void close() throws Exception {
                if (jedis != null && !jedis.isConnected()) {
                    jedis.close();
                }
            }
        });
        //将cartWideBeanDS转换成json字符串返回，因为kafka中需要传入字符串类型的数据。
        DataStream<String> cartWideJsonDataStream = cartWideBeanDS.map(new MapFunction<CartWideEntity, String>() {
            @Override
            public String map(CartWideEntity cartWideEntity) throws Exception {
                return JSON.toJSONString(cartWideEntity, SerializerFeature.DisableCircularReferenceDetect);
            }
        });

        cartWideJsonDataStream.print();
        cartWideJsonDataStream.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(KafkaUtil.kafkaCons);

                ProducerRecord<String, String> logs = new ProducerRecord<>("dwd_cart", null, value);

                kafkaProducer.send(logs);
            }
        });


        env.execute("dwd_cart");
    }

}
















































